[Fix #1329] Adding iterations to TaskContext#1337
[Fix #1329] Adding iterations to TaskContext#1337fjtirado merged 2 commits intoserverlessworkflow:mainfrom
Conversation
4e91f5f to
59be63b
Compare
There was a problem hiding this comment.
Pull request overview
Adds an iteration counter to TaskContext to help distinguish repeated executions of the same task position (e.g., looped/switch tasks), and persists/restores that value through the persistence layer.
Changes:
- Add
iterationstoTaskContextand populate it when creating a new task execution context. - Persist
iterationsfor completed tasks via a newVERSION_2encoding and propagate it throughCompletedTaskInfo. - Restore persisted
iterationswhen replaying completed tasks from persistence.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java | Bumps task-completed serialization to v2 and writes/reads iterations. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java | Restores iterations from persisted CompletedTaskInfo into TaskContext. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java | Extends persisted completed-task metadata with an iterations field (defaulting to 1 for older versions). |
| impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java | Assigns iterations when constructing TaskContext and exposes an iterations() accessor. |
| impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java | Stores iterations on the task execution context and supports copy/restore via getter/setter. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
bb168e7 to
677b1e9
Compare
Signed-off-by: fjtirado <ftirados@redhat.com>
ba91735 to
f0cbc8a
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| CompletableFuture<?> result = CompletableFuture.completedFuture(null); | ||
| for (Collection<WorkflowExecutionCompletableListener> listeners : | ||
| workflowContext.definition().application().listenersByPriority()) { | ||
| result = | ||
| result.thenCompose( | ||
| __ -> | ||
| CompletableFuture.allOf( | ||
| listeners.stream() | ||
| .map( | ||
| v -> | ||
| function | ||
| .apply(v) | ||
| .exceptionally( | ||
| ex -> { | ||
| logger.error("Error while executing listener", ex); | ||
| return null; | ||
| })) | ||
| .toArray(CompletableFuture[]::new))); | ||
| } | ||
| return result; |
There was a problem hiding this comment.
I realized that listeners with different priority should not be executed concurrently (it kills the purpose of priority). listeners with higher priority should be completed before listeners with lower priority starts execution.
Since the list of listeners is not mutable once the application has been created, they are grouped by prioriry at application creation time, so there is not need to group them for every application change.
| if (info.tasks().isEmpty()) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
This is an optimization I forgot to add previously, once all persisted task are processed there is not point on performing a remove and two intances of
| PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer()); | ||
| if (taskInfo == null) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Here is the same, there is not point in performing two instanceOf if the task is not in the map
| Collection<WorkflowExecutionCompletableListener> listeners) { | ||
| if (listeners.isEmpty()) { | ||
| return List.of(); | ||
| } | ||
| List<Collection<WorkflowExecutionCompletableListener>> result = new ArrayList<>(); | ||
| Iterator<WorkflowExecutionCompletableListener> iter = listeners.iterator(); | ||
| List<WorkflowExecutionCompletableListener> currentList = new ArrayList<>(); | ||
| WorkflowExecutionCompletableListener currentListener = iter.next(); | ||
| int currentPriority = currentListener.priority(); | ||
| currentList.add(currentListener); | ||
| while (iter.hasNext()) { | ||
| currentListener = iter.next(); | ||
| if (currentListener.priority() != currentPriority) { | ||
| result.add(currentList); | ||
| currentList = new ArrayList<>(); | ||
| currentPriority = currentListener.priority(); | ||
| } | ||
| currentList.add(currentListener); | ||
| } | ||
| if (!currentList.isEmpty()) { | ||
| result.add(currentList); | ||
| } | ||
| return result; |
There was a problem hiding this comment.
This is grouping listeners by priority.
Im using a list rather than map, since we do not really care about the priority number.
Since the source list is already sorted by priority, this old and simple algorithm will work.
It is done here because the list of listener is not mutable once the application is started to avoid repeating the grouping for every modification, which will negatively affect performance
|
|
||
| int iteration(); | ||
|
|
||
| short retryAttempt(); |
There was a problem hiding this comment.
This was a leftover from previous PR, retryAttemp make sense at interfaz level
e7e8da6 to
153612a
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: fjtirado <ftirados@redhat.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Fix #1329